Skip to content

Feat[OBE-10264] - Add topology buffer delay metric#113

Open
janmejay-s1 wants to merge 9 commits into
masterfrom
OBE-10264__add_queue_delay_metric
Open

Feat[OBE-10264] - Add topology buffer delay metric#113
janmejay-s1 wants to merge 9 commits into
masterfrom
OBE-10264__add_queue_delay_metric

Conversation

@janmejay-s1

Copy link
Copy Markdown
Contributor

Summary

Adds topology_queue_delay_seconds, a per-component histogram that measures how long
each event-batch sits in a topology buffer before being consumed. The metric is tagged
with component_id (consumer) and stage (0 for base, 1 for disk-overflow), letting
operators pinpoint which sink or transform is slow without inferring it from
backpressure or send-duration.

Wired through every buffer in the topology — sink buffers (memory or disk_v2,
including chained memory→disk overflow), inter-transform buffers, fanout buffers, and
tap buffers. Mandatory at the type level: BufferReceiver::new / with_overflow
require Registered<BufferQueueDelay> + Arc<dyn Clock> as constructor params, so a
future construction path can't silently skip it.

What changed

Wire format

  • event.proto: added google.protobuf.Timestamp enq_tm = 4 to EventArray. Field is
    optional (proto3 default-None), so pre-upgrade disk records decode with no
    timestamp and the metric reads zero residency until the buffer drains.
  • proto.rs: the symmetric From conversions stay unchanged, so the native codec and
    vector↔vector RPC wire output are unaffected.

Wrapper + clock

  • vector-buffers::Timed<T> { inner, enq_tm: Option<SystemTime> }. Delegates
    ByteSizeOf / EventCount / AddBatchNotifier / Finalizable to T.
  • Clock trait + SystemClock (default) + tests use a TokioBackedClock backed by
    tokio::time::Instant so paused-time gives exact, deterministic timing assertions.
  • Saturating elapsed (Duration::ZERO for clock backsteps and enq_tm == None).

Encodable, via blanket-impl pattern (no orphan rule violation)

  • New TimedEncodable: Encodable trait in vector-buffers.
  • Blanket impl<T: TimedEncodable> Encodable for Timed<T>Timed<T> gets Encodable
    for free when the inner T can describe how to attach the timestamp.
  • Blanket impl<T: FixedEncodable> TimedEncodable for T — pass-through (drops
    timestamp on encode, returns None on decode) so test types using FixedEncodable
    need no code changes.
  • impl TimedEncodable for EventArray in vector-core — uses the new proto field;
    this is the only place that actually serializes the timestamp on the wire. Orphan
    rule is satisfied because EventArray is local to vector-core.

Buffer plumbing

  • SenderAdapter / ReceiverAdapter channels carry Timed<T> internally; external
    API still takes/returns T.
  • BufferSender::send wraps via Timed::stamped; send_stamped is an internal
    recursive helper that preserves enq_tm through overflow.
  • BufferReceiver::next unwraps, calls with_telemetry (histogram emit + existing
    buffer-usage counter), returns inner T.
  • New BufferQueueDelay registered_event in vector-buffers/src/internal_events.rs
    emits topology_queue_delay_seconds tagged with component_id + stage.
  • BufferConfig::build / TopologyBuilder::build / standalone_memory all gain
    impl Into<ComponentKey> IDs and a sibling build_with_clock for tests.
  • Follow-up included: buffer_send_duration_seconds
    (with_send_duration_instrumentation setter) is also made a required constructor
    param so it can't be silently skipped either.

Backwards compat — verified, not assumed

  • A real legacy disk-buffer fixture (lib/vector-core/tests/data/fixtures/legacy_disk_buffer_v2/)
    was generated on master, before the proto change, and committed. The generator
    test (legacy_disk_buffer_fixture_gen.rs) stays gated #[ignore] for
    regeneration if the ledger format ever changes.

Tests

Test What it proves
event::ser::tests (4) Encodable round-trip with/without timestamp; bare-EventArray decoder ignores tag 4; pre-tag-4 bytes decode as Timed { enq_tm: None }
timed::tests (5) Stamped/untimed construction, elapsed computation, NTP-backstep returns ZERO
buffer_disk_v2_compat::tests (3) T2/T3/T4a — payload correctness through new buffer; legacy fixture decodes; mixed-records FIFO drain
buffer_histogram_demo::tests (3) Histogram emits exact values for in-mem / disk / overflow with paused-clock + TokioBackedClock
topology::test::queue_delay (1) T5 — demo_logs → remap → remap → blackhole topology loaded from TOML; per-component_id histograms verified via Controller::capture_metrics()

@janmejay-s1

Copy link
Copy Markdown
Contributor Author

Manual test

{
  "name": "buffer_send_duration_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "test_xform",
    "component_kind": "transform",
    "component_type": "remap",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:22.679285Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 1073
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 1073,
    "sum": 0.14668267000000013
  }
}
{
  "name": "buffer_send_duration_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "hec_out",
    "component_kind": "sink",
    "component_type": "splunk_hec_logs",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:22.679285Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 1073
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 1073,
    "sum": 0.008903986000000008
  }
}
{
  "name": "topology_queue_delay_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "test_xform",
    "component_kind": "transform",
    "component_type": "remap",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:22.679285Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 1073
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 1073,
    "sum": 0.028674000000000113
  }
}
{
  "name": "buffer_send_duration_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "hec_out",
    "component_kind": "sink",
    "component_type": "splunk_hec_logs",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:37.680071Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 2000
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 2000,
    "sum": 0.016845860999999986
  }
}
{
  "name": "topology_queue_delay_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "test_xform",
    "component_kind": "transform",
    "component_type": "remap",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:37.680071Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 2000
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 2000,
    "sum": 0.0531560000000003
  }
}
{
  "name": "buffer_send_duration_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "file_log_sink",
    "component_kind": "sink",
    "component_type": "file",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:37.680071Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 3
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 3,
    "sum": 0.000417375
  }
}
{
  "name": "topology_queue_delay_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "file_log_sink",
    "component_kind": "sink",
    "component_type": "file",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:37.680071Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 3
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 3,
    "sum": 0.000175
  }
}
{
  "name": "topology_queue_delay_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "hec_out",
    "component_kind": "sink",
    "component_type": "splunk_hec_logs",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:37.680071Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 1510
      },
      {
        "upper_limit": 0.03125,
        "count": 1
      },
      {
        "upper_limit": 0.0625,
        "count": 2
      },
      {
        "upper_limit": 0.125,
        "count": 4
      },
      {
        "upper_limit": 0.25,
        "count": 7
      },
      {
        "upper_limit": 0.5,
        "count": 17
      },
      {
        "upper_limit": 1.0,
        "count": 31
      },
      {
        "upper_limit": 2.0,
        "count": 63
      },
      {
        "upper_limit": 4.0,
        "count": 132
      },
      {
        "upper_limit": 8.0,
        "count": 233
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 2000,
    "sum": 1870.2209169999917
  }
}
{
  "name": "buffer_send_duration_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "memory",
    "component_id": "test_xform",
    "component_kind": "transform",
    "component_type": "remap",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:35:37.680071Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 2000
      },
      {
        "upper_limit": 0.03125,
        "count": 0
      },
      {
        "upper_limit": 0.0625,
        "count": 0
      },
      {
        "upper_limit": 0.125,
        "count": 0
      },
      {
        "upper_limit": 0.25,
        "count": 0
      },
      {
        "upper_limit": 0.5,
        "count": 0
      },
      {
        "upper_limit": 1.0,
        "count": 0
      },
      {
        "upper_limit": 2.0,
        "count": 0
      },
      {
        "upper_limit": 4.0,
        "count": 0
      },
      {
        "upper_limit": 8.0,
        "count": 0
      },
      {
        "upper_limit": 16.0,
        "count": 0
      },
      {
        "upper_limit": 32.0,
        "count": 0
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 2000,
    "sum": 0.27113599400000005
  }
}

@janmejay-s1

Copy link
Copy Markdown
Contributor Author

Disk buffer queue-delay

cat /tmp/metrics.out.json | grep -P 'topology_queue_delay_seconds|buffer_send_duration_seconds' | jq -c 'select(.name == "topology_queue_delay_seconds" and .tags.buffer_type == "disk")' | tail
 -1 | jq '.'
{
  "name": "topology_queue_delay_seconds",
  "namespace": "vector",
  "tags": {
    "buffer_type": "disk",
    "component_id": "hec_out",
    "component_kind": "sink",
    "component_type": "splunk_hec_logs",
    "host": "HV9JX62PP7",
    "stage": "0"
  },
  "timestamp": "2026-06-12T18:46:47.881244Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      {
        "upper_limit": 0.015625,
        "count": 966
      },
      {
        "upper_limit": 0.03125,
        "count": 1
      },
      {
        "upper_limit": 0.0625,
        "count": 2
      },
      {
        "upper_limit": 0.125,
        "count": 4
      },
      {
        "upper_limit": 0.25,
        "count": 7
      },
      {
        "upper_limit": 0.5,
        "count": 16
      },
      {
        "upper_limit": 1.0,
        "count": 31
      },
      {
        "upper_limit": 2.0,
        "count": 63
      },
      {
        "upper_limit": 4.0,
        "count": 133
      },
      {
        "upper_limit": 8.0,
        "count": 261
      },
      {
        "upper_limit": 16.0,
        "count": 500
      },
      {
        "upper_limit": 32.0,
        "count": 16
      },
      {
        "upper_limit": 64.0,
        "count": 0
      },
      {
        "upper_limit": 128.0,
        "count": 0
      },
      {
        "upper_limit": 256.0,
        "count": 0
      },
      {
        "upper_limit": 512.0,
        "count": 0
      },
      {
        "upper_limit": 1024.0,
        "count": 0
      },
      {
        "upper_limit": 2048.0,
        "count": 0
      },
      {
        "upper_limit": 4096.0,
        "count": 0
      },
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 2000,
    "sum": 8361.699766999944
  }
}

@janmejay-s1

Copy link
Copy Markdown
Contributor Author

@janmejay-s1 janmejay-s1 requested a review from jsbalis1 June 12, 2026 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant